/*
 * Copyright 2010 david varnes.
 *
 * Licensed under the Apache License, version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.freeswitch.esl.client.inbound;

import cn.hutool.core.util.StrUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.huilan.base.esl.service.exeception.TimeoutException;
import com.huilan.base.esl.service.fs.EslClient;
import lombok.extern.slf4j.Slf4j;
import org.freeswitch.esl.client.IEslEventListener;
import org.freeswitch.esl.client.internal.IEslProtocolListener;
import org.freeswitch.esl.client.transport.CommandResponse;
import org.freeswitch.esl.client.transport.SendMsg;
import org.freeswitch.esl.client.transport.event.EslEvent;
import org.freeswitch.esl.client.transport.message.EslMessage;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
public class Client {

    private final ThreadFactory eslThreadFactory = new ThreadFactoryBuilder().setNameFormat("esl-event-pool-%d").build();

    //esl 事件处理线程池
    //1. 核心线程数量根据业务量调整
    //2. 队列容量不能太大,事件等待时间不能过长
    //3. 根据线程名称可以看出队列有没有满（但是不能将异常抛到线程池，否则会销毁core线程重新创建，线程编号就+1），队列满了要扩大core
    private final Executor eventListenerExecutor = new ThreadPoolExecutor(48, 500,
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(16),eslThreadFactory,new ThreadPoolExecutor.AbortPolicy());

    private AtomicBoolean authenticatorResponded = new AtomicBoolean(false);
    private boolean authenticated;
    private CommandResponse authenticationResponse;
    private Channel channel;
    private final ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat("boss-pool-%d").build();
    private final ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("worker-pool-%d").build();

    private ExecutorService bossPool = null;
    private ExecutorService workerPool = null;
    private NioClientSocketChannelFactory nioClientSocketChannelFactory = null;

    private int bossMaxNumber = 1;
    private int workerMinNumber = Runtime.getRuntime().availableProcessors() * 2;
    private int workerMaxNumber = 96;

    private Map<String,SyncCallback> map = new ConcurrentHashMap<>();

    public Client() {
    }

    public boolean canSend() {
        return channel != null && channel.isConnected() && authenticated;
    }

    public void connect(String host, int port, String password, int timeoutSeconds) throws InboundConnectionFailure {
        // If already connected, disconnect first
        if (canSend()) {
            close();
        } else {
            //canSend()=false but channel is still opened or connected
            closeChannel();
        }
        if (nioClientSocketChannelFactory != null) {
            nioClientSocketChannelFactory.releaseExternalResources();
        }

        bossPool = new ThreadPoolExecutor(1, bossMaxNumber,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(128), bossThreadFactory, new ThreadPoolExecutor.AbortPolicy());

        workerPool = new ThreadPoolExecutor(workerMinNumber, workerMaxNumber,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(2048), workerThreadFactory, new ThreadPoolExecutor.AbortPolicy());

        nioClientSocketChannelFactory = new NioClientSocketChannelFactory(bossPool, workerPool);

        // Configure this client
        ClientBootstrap bootstrap = new ClientBootstrap(nioClientSocketChannelFactory);

        // Add ESL handler and factory
        InboundClientHandler handler = new InboundClientHandler(password, protocolListener);
        handler.setClient(this);
        bootstrap.setPipelineFactory(new InboundPipelineFactory(handler));

        // Attempt connection
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

        // Wait till attempt succeeds, fails or timeouts
        if (!future.awaitUninterruptibly(timeoutSeconds, TimeUnit.SECONDS)) {
            throw new InboundConnectionFailure("Timeout connecting to " + host + ":" + port);
        }
        // Did not timeout 
        channel = future.getChannel();
        // But may have failed anyway
        if (!future.isSuccess()) {
            log.warn("Failed to connect to [{}:{}]", host, port);
            log.warn("  * reason: {}", future.getCause());

            channel = null;
            bootstrap.releaseExternalResources();

            throw new InboundConnectionFailure("Could not connect to " + host + ":" + port, future.getCause());
        }

        //  Wait for the authentication handshake to call back
        while (!authenticatorResponded.get()) {
            try {
                Thread.sleep(250);
            } catch (InterruptedException e) {
                // ignore
            }
        }

        if (!authenticated) {
            throw new InboundConnectionFailure("Authentication failed: " + authenticationResponse.getReplyText());
        }
    }


    @Deprecated
    public EslMessage sendSyncApiCommand(String command, String arg) {
        checkConnected();
        InboundClientHandler handler = (InboundClientHandler) channel.getPipeline().getLast();
        StringBuilder sb = new StringBuilder();
        if (command != null && !command.isEmpty()) {
            sb.append("api ");
            sb.append(command);
        }
        if (arg != null && !arg.isEmpty()) {
            sb.append(' ');
            sb.append(arg);
        }
        String string = sb.toString();
        EslMessage EslMessage = handler.sendSyncSingleLineCommand(channel, string);
        log.info("fs sendSyncApiCommand :{} result  :{}",string,EslMessage.getBodyLines().get(0));
        return EslMessage;
    }

    public List<String> sendAsyncApiCommand(String command, String arg) {
        return sendAsyncApiCommand(command,arg,false);
    }
    /**
     * @Description  取消api 方式 全部使用bgapi方式   自己实现消息接收
     * @param command
	 * @param arg
     * @Return java.lang.String
     * @Author liuwc
     * @Date 2020/11/25 16:22
    */
    public List<String> sendAsyncApiCommand(String command, String arg,boolean skipResult) {
        return sendAsyncApiCommand(command,arg,skipResult,2);
    }

    public List<String> sendAsyncApiCommand(String command, String arg,boolean skipResult,int time) {
        checkConnected();
        InboundClientHandler handler = (InboundClientHandler) channel.getPipeline().getLast();
        StringBuilder sb = new StringBuilder();
         if (command != null && !command.isEmpty()) {
            sb.append("bgapi ");
            sb.append(command);
        }
        if (arg != null && !arg.isEmpty()) {
            sb.append(' ');
            sb.append(arg);
        }
        String string = sb.toString();
         log.info("fs command :{}",string);
         if (skipResult){
             String jobId = handler.sendAsyncCommand(channel, string);
             return Arrays.asList(jobId);
         }else{
             SyncCallback callback = new SyncCallback();
             String jobId = handler.sendAsyncCommand(channel, string);
             map.put(jobId,callback);
             List<String> result = callback.get(time, TimeUnit.SECONDS);
             if (result == null){
                 map.remove(jobId);
                 throw TimeoutException.builder().msg("fs bgJob timeout").build();
             }
             return result;
         }
    }


    public boolean handle(String key, List<String> response) {
         if (map.containsKey(key) == false){
             boolean sleep = sleep(10, key);
             if (sleep == false){
                 return false;
             }
         }
         map.remove(key).handle(response);
         return true;
    }

    public boolean sleep(int times,String key){
        for (int i = 1; i <= times ; i++) {
            try {
                Thread.sleep(2);
                if (map.containsKey(key)){
                    return true;
                }
            }catch (Exception e){
            }
        }
         return false;
    }

    private static class SyncCallback {
        private final CountDownLatch latch = new CountDownLatch( 1 );
        private List<String> response;

        List<String> get() {
            try {
                latch.await();
            }
            catch ( InterruptedException e ) {
                throw new RuntimeException( e );
            }
            return response;
        }
        List<String> get(long timeout, TimeUnit unit) {
            try {
                latch.await(timeout,unit);
            }
            catch ( InterruptedException e ) {
                throw new RuntimeException( e );
            }
            return response;
        }


        void handle( List<String> response )
        {
            this.response = response;
            latch.countDown();
        }
    }


    /**
     * Set the current event subscription for this connection to the server.  Examples of the events
     * argument are:
     * <pre>
     *   ALL
     *   CHANNEL_CREATE CHANNEL_DESTROY HEARTBEAT
     *   CUSTOM conference::maintenance
     *   CHANNEL_CREATE CHANNEL_DESTROY CUSTOM conference::maintenance sofia::register sofia::expire
     * </pre>
     * Subsequent calls to this method replaces any previous subscriptions that were set.
     * </p>
     * Note: current implementation can only process 'plain' events.
     *
     * @param format can be { plain | xml }
     * @param events { all | space separated list of events }
     * @return a {@link CommandResponse} with the server's response.
     */
    public CommandResponse setEventSubscriptions(String format, String events) {
        // temporary hack
        if (!"plain".equals(format)) {
            throw new IllegalStateException("Only 'plain' event format is supported at present");
        }

        checkConnected();
        InboundClientHandler handler = (InboundClientHandler) channel.getPipeline().getLast();
        StringBuilder sb = new StringBuilder();
        if (format != null && !format.isEmpty()) {
            sb.append("event ");
            sb.append(format);
        }
        if (events != null && !events.isEmpty()) {
            sb.append(' ');
            sb.append(events);
        }
        EslMessage response = handler.sendSyncSingleLineCommand(channel, sb.toString());

        return new CommandResponse(sb.toString(), response);
    }


    /**
     * Cancel any existing event subscription.
     *
     * @return a {@link CommandResponse} with the server's response.
     */
    public CommandResponse cancelEventSubscriptions() {
        checkConnected();
        InboundClientHandler handler = (InboundClientHandler) channel.getPipeline().getLast();
        EslMessage response = handler.sendSyncSingleLineCommand(channel, "noevents");

        return new CommandResponse("noevents", response);
    }
    public CommandResponse sendevent() {
        checkConnected();
        InboundClientHandler handler = (InboundClientHandler) channel.getPipeline().getLast();
        EslMessage response = handler.sendSyncSingleLineCommand(channel, "noevents");

        return new CommandResponse("noevents", response);
    }

    /**
     * Add an event filter to the current set of event filters on this connection. Any of the event headers
     * can be used as a filter.
     * </p>
     * Note that event filters follow 'filter-in' semantics. That is, when a filter is applied
     * only the filtered values will be received. Multiple filters can be added to the current
     * connection.
     * </p>
     * Example filters:
     * <pre>
     *    eventHeader        valueToFilter
     *    ----------------------------------
     *    Event-Name         CHANNEL_EXECUTE
     *    Channel-State      CS_NEW
     * </pre>
     *
     * @param eventHeader   to filter on
     * @param valueToFilter the value to match
     * @return a {@link CommandResponse} with the server's response.
     */
    public CommandResponse addEventFilter(String eventHeader, String valueToFilter) {
        checkConnected();
        InboundClientHandler handler = (InboundClientHandler) channel.getPipeline().getLast();
        StringBuilder sb = new StringBuilder();
        if (eventHeader != null && !eventHeader.isEmpty()) {
            sb.append("filter ");
            sb.append(eventHeader);
        }
        if (valueToFilter != null && !valueToFilter.isEmpty()) {
            sb.append(' ');
            sb.append(valueToFilter);
        }
        EslMessage response = handler.sendSyncSingleLineCommand(channel, sb.toString());
        return new CommandResponse(sb.toString(), response);
    }

    /**
     * Delete an event filter from the current set of event filters on this connection.  See
     * {@link Client#addEventFilter(String, String)}
     *
     * @param eventHeader   to remove
     * @param valueToFilter to remove
     * @return a {@link CommandResponse} with the server's response.
     */
    public CommandResponse deleteEventFilter(String eventHeader, String valueToFilter) {
        checkConnected();
        InboundClientHandler handler = (InboundClientHandler) channel.getPipeline().getLast();
        StringBuilder sb = new StringBuilder();
        if (eventHeader != null && !eventHeader.isEmpty()) {
            sb.append("filter delete ");
            sb.append(eventHeader);
        }
        if (valueToFilter != null && !valueToFilter.isEmpty()) {
            sb.append(' ');
            sb.append(valueToFilter);
        }
        EslMessage response = handler.sendSyncSingleLineCommand(channel, sb.toString());

        return new CommandResponse(sb.toString(), response);
    }

    /**
     * Send a {@link SendMsg} command to FreeSWITCH.  This client requires that the {@link SendMsg}
     * has a call UUID parameter.
     *
     * @param sendMsg a {@link SendMsg} with call UUID
     * @return a {@link CommandResponse} with the server's response.
     */
    public CommandResponse sendMessage(SendMsg sendMsg) {
        checkConnected();
        InboundClientHandler handler = (InboundClientHandler) channel.getPipeline().getLast();
        EslMessage response = handler.sendSyncMultiLineCommand(channel, sendMsg.getMsgLines());
        sendMsg.addEventLock();
        CommandResponse commandResponse = new CommandResponse(sendMsg.toString(), response);
        log.info("fs app :{}  result  :{}",sendMsg.getMsgLines(),commandResponse.getReplyText());
        return commandResponse;
    }
    public CommandResponse sendCommand(String command) {
        checkConnected();
        InboundClientHandler handler = (InboundClientHandler) channel.getPipeline().getLast();
        EslMessage response = handler.sendSyncMultiLineCommand(channel, command);
        CommandResponse commandResponse = new CommandResponse(command, response);
        log.info("fs command :{}  result  :{}",command,commandResponse.getReplyText());
        return commandResponse;
    }


    /**
     * Enable log output.
     *
     * @param level using the same values as in console.conf
     * @return a {@link CommandResponse} with the server's response.
     */
    public CommandResponse setLoggingLevel(String level) {
        checkConnected();
        InboundClientHandler handler = (InboundClientHandler) channel.getPipeline().getLast();
        StringBuilder sb = new StringBuilder();
        if (level != null && !level.isEmpty()) {
            sb.append("log ");
            sb.append(level);
        }
        EslMessage response = handler.sendSyncSingleLineCommand(channel, sb.toString());

        return new CommandResponse(sb.toString(), response);
    }

    /**
     * Disable any logging previously enabled with setLogLevel().
     *
     * @return a {@link CommandResponse} with the server's response.
     */
    public CommandResponse cancelLogging() {
        checkConnected();
        InboundClientHandler handler = (InboundClientHandler) channel.getPipeline().getLast();
        EslMessage response = handler.sendSyncSingleLineCommand(channel, "nolog");

        return new CommandResponse("nolog", response);
    }

    /**
     * Close the socket connection
     *
     * @return a {@link CommandResponse} with the server's response.
     */
    public CommandResponse close() {
        checkConnected();
        InboundClientHandler handler = (InboundClientHandler) channel.getPipeline().getLast();
        EslMessage response = handler.sendSyncSingleLineCommand(channel, "exit");
        return new CommandResponse("exit", response);
    }

    /**
     * close netty channel
     *
     * @return
     */
    public ChannelFuture closeChannel() {
        if (channel != null && channel.isOpen()) {
            return channel.close();
        }
        return null;
    }

    /*
     *  Internal observer of the ESL protocol
     */
    private final IEslProtocolListener protocolListener = new IEslProtocolListener() {
        @Override
        public void authResponseReceived(CommandResponse response) {
            authenticatorResponded.set(true);
            authenticated = response.isOk();
            authenticationResponse = response;
            log.debug("Auth response success={}, message=[{}]", authenticated, response.getReplyText());
        }

        @Override
        public void eventReceived(final EslEvent event,Client client) {
            Map<String, String> eventHeaders = event.getEventHeaders();
//            log.info("Event Subclass {}", eventHeaders.get("Event-Subclass"));
//            log.info("Event head {}", eventHeaders);
            eventListenerExecutor.execute(()->{
                try {
                    //设置线路id 在logback.xml 中使用
                    MDC.put("traceId", StrUtil.uuid());
//                    log.info("Event name {}", event.getEventName());
                    //捕捉业务处理的异常，否者会销毁core线程后重新创建，会有两个问题
                    //1. 增加开销
                    //2. 不能从线程创建的顺序编号中看出线程使用率
                    IEslEventListener bean = (IEslEventListener) EslClient.getBeanByEventName(event.getEventName(),eventHeaders.get("Event-Subclass")).getBean();
                    bean.eventReceived(event, client);
                }catch (Exception e){
                    log.error("Exception",e);
                }
            });
        }


        @Override
        public void disconnected() {
            log.info("Disconnected ..");
        }
    };


    private void checkConnected() {
        if (!canSend()) {
            throw new IllegalStateException("Not connected to FreeSWITCH Event Socket");
        }
    }
}
